tokio\net\windows/named_pipe.rs
1//! Tokio support for [Windows named pipes].
2//!
3//! [Windows named pipes]: https://docs.microsoft.com/en-us/windows/win32/ipc/named-pipes
4
5use std::ffi::c_void;
6use std::ffi::OsStr;
7use std::io::{self, Read, Write};
8use std::pin::Pin;
9use std::ptr;
10use std::ptr::null_mut;
11use std::task::{Context, Poll};
12
13use crate::io::{AsyncRead, AsyncWrite, Interest, PollEvented, ReadBuf, Ready};
14use crate::os::windows::io::{AsHandle, AsRawHandle, BorrowedHandle, FromRawHandle, RawHandle};
15
16cfg_io_util! {
17 use bytes::BufMut;
18}
19
20// Hide imports which are not used when generating documentation.
21#[cfg(windows)]
22mod doc {
23 pub(super) use crate::os::windows::ffi::OsStrExt;
24 pub(super) mod windows_sys {
25 pub(crate) use windows_sys::{
26 Win32::Foundation::*, Win32::Storage::FileSystem::*, Win32::System::Pipes::*,
27 Win32::System::SystemServices::*,
28 };
29 }
30 pub(super) use mio::windows as mio_windows;
31}
32
33// NB: none of these shows up in public API, so don't document them.
34#[cfg(not(windows))]
35mod doc {
36 pub(super) mod mio_windows {
37 pub type NamedPipe = crate::doc::NotDefinedHere;
38 }
39}
40
41use self::doc::*;
42
43/// A [Windows named pipe] server.
44///
45/// Accepting client connections involves creating a server with
46/// [`ServerOptions::create`] and waiting for clients to connect using
47/// [`NamedPipeServer::connect`].
48///
49/// To avoid having clients sporadically fail with
50/// [`std::io::ErrorKind::NotFound`] when they connect to a server, we must
51/// ensure that at least one server instance is available at all times. This
52/// means that the typical listen loop for a server is a bit involved, because
53/// we have to ensure that we never drop a server accidentally while a client
54/// might connect.
55///
56/// So a correctly implemented server looks like this:
57///
58/// ```no_run
59/// use std::io;
60/// use tokio::net::windows::named_pipe::ServerOptions;
61///
62/// const PIPE_NAME: &str = r"\\.\pipe\named-pipe-idiomatic-server";
63///
64/// # #[tokio::main] async fn main() -> std::io::Result<()> {
65/// // The first server needs to be constructed early so that clients can
66/// // be correctly connected. Otherwise calling .wait will cause the client to
67/// // error.
68/// //
69/// // Here we also make use of `first_pipe_instance`, which will ensure that
70/// // there are no other servers up and running already.
71/// let mut server = ServerOptions::new()
72/// .first_pipe_instance(true)
73/// .create(PIPE_NAME)?;
74///
75/// // Spawn the server loop.
76/// let server = tokio::spawn(async move {
77/// loop {
78/// // Wait for a client to connect.
79/// server.connect().await?;
80/// let connected_client = server;
81///
82/// // Construct the next server to be connected before sending the one
83/// // we already have of onto a task. This ensures that the server
84/// // isn't closed (after it's done in the task) before a new one is
85/// // available. Otherwise the client might error with
86/// // `io::ErrorKind::NotFound`.
87/// server = ServerOptions::new().create(PIPE_NAME)?;
88///
89/// let client = tokio::spawn(async move {
90/// /* use the connected client */
91/// # Ok::<_, std::io::Error>(())
92/// });
93/// # if true { break } // needed for type inference to work
94/// }
95///
96/// Ok::<_, io::Error>(())
97/// });
98///
99/// /* do something else not server related here */
100/// # Ok(()) }
101/// ```
102///
103/// [Windows named pipe]: https://docs.microsoft.com/en-us/windows/win32/ipc/named-pipes
104#[derive(Debug)]
105pub struct NamedPipeServer {
106 io: PollEvented<mio_windows::NamedPipe>,
107}
108
109impl NamedPipeServer {
110 /// Constructs a new named pipe server from the specified raw handle.
111 ///
112 /// This function will consume ownership of the handle given, passing
113 /// responsibility for closing the handle to the returned object.
114 ///
115 /// This function is also unsafe as the primitives currently returned have
116 /// the contract that they are the sole owner of the file descriptor they
117 /// are wrapping. Usage of this function could accidentally allow violating
118 /// this contract which can cause memory unsafety in code that relies on it
119 /// being true.
120 ///
121 /// # Errors
122 ///
123 /// This errors if called outside of a [Tokio Runtime], or in a runtime that
124 /// has not [enabled I/O], or if any OS-specific I/O errors occur.
125 ///
126 /// [Tokio Runtime]: crate::runtime::Runtime
127 /// [enabled I/O]: crate::runtime::Builder::enable_io
128 pub unsafe fn from_raw_handle(handle: RawHandle) -> io::Result<Self> {
129 let named_pipe = mio_windows::NamedPipe::from_raw_handle(handle);
130
131 Ok(Self {
132 io: PollEvented::new(named_pipe)?,
133 })
134 }
135
136 /// Retrieves information about the named pipe the server is associated
137 /// with.
138 ///
139 /// ```no_run
140 /// use tokio::net::windows::named_pipe::{PipeEnd, PipeMode, ServerOptions};
141 ///
142 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-server-info";
143 ///
144 /// # #[tokio::main] async fn main() -> std::io::Result<()> {
145 /// let server = ServerOptions::new()
146 /// .pipe_mode(PipeMode::Message)
147 /// .max_instances(5)
148 /// .create(PIPE_NAME)?;
149 ///
150 /// let server_info = server.info()?;
151 ///
152 /// assert_eq!(server_info.end, PipeEnd::Server);
153 /// assert_eq!(server_info.mode, PipeMode::Message);
154 /// assert_eq!(server_info.max_instances, 5);
155 /// # Ok(()) }
156 /// ```
157 pub fn info(&self) -> io::Result<PipeInfo> {
158 // Safety: we're ensuring the lifetime of the named pipe.
159 unsafe { named_pipe_info(self.io.as_raw_handle()) }
160 }
161
162 /// Enables a named pipe server process to wait for a client process to
163 /// connect to an instance of a named pipe. A client process connects by
164 /// creating a named pipe with the same name.
165 ///
166 /// This corresponds to the [`ConnectNamedPipe`] system call.
167 ///
168 /// # Cancel safety
169 ///
170 /// This method is cancellation safe in the sense that if it is used as the
171 /// event in a [`select!`](crate::select) statement and some other branch
172 /// completes first, then no connection events have been lost.
173 ///
174 /// [`ConnectNamedPipe`]: https://docs.microsoft.com/en-us/windows/win32/api/namedpipeapi/nf-namedpipeapi-connectnamedpipe
175 ///
176 /// # Example
177 ///
178 /// ```no_run
179 /// use tokio::net::windows::named_pipe::ServerOptions;
180 ///
181 /// const PIPE_NAME: &str = r"\\.\pipe\mynamedpipe";
182 ///
183 /// # #[tokio::main] async fn main() -> std::io::Result<()> {
184 /// let pipe = ServerOptions::new().create(PIPE_NAME)?;
185 ///
186 /// // Wait for a client to connect.
187 /// pipe.connect().await?;
188 ///
189 /// // Use the connected client...
190 /// # Ok(()) }
191 /// ```
192 pub async fn connect(&self) -> io::Result<()> {
193 match self.io.connect() {
194 Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
195 self.io
196 .registration()
197 .async_io(Interest::WRITABLE, || self.io.connect())
198 .await
199 }
200 x => x,
201 }
202 }
203
204 /// Disconnects the server end of a named pipe instance from a client
205 /// process.
206 ///
207 /// ```
208 /// use tokio::io::AsyncWriteExt;
209 /// use tokio::net::windows::named_pipe::{ClientOptions, ServerOptions};
210 /// use windows_sys::Win32::Foundation::ERROR_PIPE_NOT_CONNECTED;
211 ///
212 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-disconnect";
213 ///
214 /// # #[tokio::main] async fn main() -> std::io::Result<()> {
215 /// let server = ServerOptions::new()
216 /// .create(PIPE_NAME)?;
217 ///
218 /// let mut client = ClientOptions::new()
219 /// .open(PIPE_NAME)?;
220 ///
221 /// // Wait for a client to become connected.
222 /// server.connect().await?;
223 ///
224 /// // Forcibly disconnect the client.
225 /// server.disconnect()?;
226 ///
227 /// // Write fails with an OS-specific error after client has been
228 /// // disconnected.
229 /// let e = client.write(b"ping").await.unwrap_err();
230 /// assert_eq!(e.raw_os_error(), Some(ERROR_PIPE_NOT_CONNECTED as i32));
231 /// # Ok(()) }
232 /// ```
233 pub fn disconnect(&self) -> io::Result<()> {
234 self.io.disconnect()
235 }
236
237 /// Waits for any of the requested ready states.
238 ///
239 /// This function is usually paired with `try_read()` or `try_write()`. It
240 /// can be used to concurrently read / write to the same pipe on a single
241 /// task without splitting the pipe.
242 ///
243 /// The function may complete without the pipe being ready. This is a
244 /// false-positive and attempting an operation will return with
245 /// `io::ErrorKind::WouldBlock`. The function can also return with an empty
246 /// [`Ready`] set, so you should always check the returned value and possibly
247 /// wait again if the requested states are not set.
248 ///
249 /// # Examples
250 ///
251 /// Concurrently read and write to the pipe on the same task without
252 /// splitting.
253 ///
254 /// ```no_run
255 /// use tokio::io::Interest;
256 /// use tokio::net::windows::named_pipe;
257 /// use std::error::Error;
258 /// use std::io;
259 ///
260 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-server-ready";
261 ///
262 /// #[tokio::main]
263 /// async fn main() -> Result<(), Box<dyn Error>> {
264 /// let server = named_pipe::ServerOptions::new()
265 /// .create(PIPE_NAME)?;
266 ///
267 /// loop {
268 /// let ready = server.ready(Interest::READABLE | Interest::WRITABLE).await?;
269 ///
270 /// if ready.is_readable() {
271 /// let mut data = vec![0; 1024];
272 /// // Try to read data, this may still fail with `WouldBlock`
273 /// // if the readiness event is a false positive.
274 /// match server.try_read(&mut data) {
275 /// Ok(n) => {
276 /// println!("read {} bytes", n);
277 /// }
278 /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
279 /// continue;
280 /// }
281 /// Err(e) => {
282 /// return Err(e.into());
283 /// }
284 /// }
285 /// }
286 ///
287 /// if ready.is_writable() {
288 /// // Try to write data, this may still fail with `WouldBlock`
289 /// // if the readiness event is a false positive.
290 /// match server.try_write(b"hello world") {
291 /// Ok(n) => {
292 /// println!("write {} bytes", n);
293 /// }
294 /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
295 /// continue;
296 /// }
297 /// Err(e) => {
298 /// return Err(e.into());
299 /// }
300 /// }
301 /// }
302 /// }
303 /// }
304 /// ```
305 pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
306 let event = self.io.registration().readiness(interest).await?;
307 Ok(event.ready)
308 }
309
310 /// Waits for the pipe to become readable.
311 ///
312 /// This function is equivalent to `ready(Interest::READABLE)` and is usually
313 /// paired with `try_read()`.
314 ///
315 /// # Examples
316 ///
317 /// ```no_run
318 /// use tokio::net::windows::named_pipe;
319 /// use std::error::Error;
320 /// use std::io;
321 ///
322 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-server-readable";
323 ///
324 /// #[tokio::main]
325 /// async fn main() -> Result<(), Box<dyn Error>> {
326 /// let server = named_pipe::ServerOptions::new()
327 /// .create(PIPE_NAME)?;
328 ///
329 /// let mut msg = vec![0; 1024];
330 ///
331 /// loop {
332 /// // Wait for the pipe to be readable
333 /// server.readable().await?;
334 ///
335 /// // Try to read data, this may still fail with `WouldBlock`
336 /// // if the readiness event is a false positive.
337 /// match server.try_read(&mut msg) {
338 /// Ok(n) => {
339 /// msg.truncate(n);
340 /// break;
341 /// }
342 /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
343 /// continue;
344 /// }
345 /// Err(e) => {
346 /// return Err(e.into());
347 /// }
348 /// }
349 /// }
350 ///
351 /// println!("GOT = {:?}", msg);
352 /// Ok(())
353 /// }
354 /// ```
355 pub async fn readable(&self) -> io::Result<()> {
356 self.ready(Interest::READABLE).await?;
357 Ok(())
358 }
359
360 /// Polls for read readiness.
361 ///
362 /// If the pipe is not currently ready for reading, this method will
363 /// store a clone of the `Waker` from the provided `Context`. When the pipe
364 /// becomes ready for reading, `Waker::wake` will be called on the waker.
365 ///
366 /// Note that on multiple calls to `poll_read_ready` or `poll_read`, only
367 /// the `Waker` from the `Context` passed to the most recent call is
368 /// scheduled to receive a wakeup. (However, `poll_write_ready` retains a
369 /// second, independent waker.)
370 ///
371 /// This function is intended for cases where creating and pinning a future
372 /// via [`readable`] is not feasible. Where possible, using [`readable`] is
373 /// preferred, as this supports polling from multiple tasks at once.
374 ///
375 /// # Return value
376 ///
377 /// The function returns:
378 ///
379 /// * `Poll::Pending` if the pipe is not ready for reading.
380 /// * `Poll::Ready(Ok(()))` if the pipe is ready for reading.
381 /// * `Poll::Ready(Err(e))` if an error is encountered.
382 ///
383 /// # Errors
384 ///
385 /// This function may encounter any standard I/O error except `WouldBlock`.
386 ///
387 /// [`readable`]: method@Self::readable
388 pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
389 self.io.registration().poll_read_ready(cx).map_ok(|_| ())
390 }
391
392 /// Tries to read data from the pipe into the provided buffer, returning how
393 /// many bytes were read.
394 ///
395 /// Receives any pending data from the pipe but does not wait for new data
396 /// to arrive. On success, returns the number of bytes read. Because
397 /// `try_read()` is non-blocking, the buffer does not have to be stored by
398 /// the async task and can exist entirely on the stack.
399 ///
400 /// Usually, [`readable()`] or [`ready()`] is used with this function.
401 ///
402 /// [`readable()`]: NamedPipeServer::readable()
403 /// [`ready()`]: NamedPipeServer::ready()
404 ///
405 /// # Return
406 ///
407 /// If data is successfully read, `Ok(n)` is returned, where `n` is the
408 /// number of bytes read. If `n` is `0`, then it can indicate one of two scenarios:
409 ///
410 /// 1. The pipe's read half is closed and will no longer yield data.
411 /// 2. The specified buffer was 0 bytes in length.
412 ///
413 /// If the pipe is not ready to read data,
414 /// `Err(io::ErrorKind::WouldBlock)` is returned.
415 ///
416 /// # Examples
417 ///
418 /// ```no_run
419 /// use tokio::net::windows::named_pipe;
420 /// use std::error::Error;
421 /// use std::io;
422 ///
423 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-server-try-read";
424 ///
425 /// #[tokio::main]
426 /// async fn main() -> Result<(), Box<dyn Error>> {
427 /// let server = named_pipe::ServerOptions::new()
428 /// .create(PIPE_NAME)?;
429 ///
430 /// loop {
431 /// // Wait for the pipe to be readable
432 /// server.readable().await?;
433 ///
434 /// // Creating the buffer **after** the `await` prevents it from
435 /// // being stored in the async task.
436 /// let mut buf = [0; 4096];
437 ///
438 /// // Try to read data, this may still fail with `WouldBlock`
439 /// // if the readiness event is a false positive.
440 /// match server.try_read(&mut buf) {
441 /// Ok(0) => break,
442 /// Ok(n) => {
443 /// println!("read {} bytes", n);
444 /// }
445 /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
446 /// continue;
447 /// }
448 /// Err(e) => {
449 /// return Err(e.into());
450 /// }
451 /// }
452 /// }
453 ///
454 /// Ok(())
455 /// }
456 /// ```
457 pub fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> {
458 self.io
459 .registration()
460 .try_io(Interest::READABLE, || (&*self.io).read(buf))
461 }
462
463 /// Tries to read data from the pipe into the provided buffers, returning
464 /// how many bytes were read.
465 ///
466 /// Data is copied to fill each buffer in order, with the final buffer
467 /// written to possibly being only partially filled. This method behaves
468 /// equivalently to a single call to [`try_read()`] with concatenated
469 /// buffers.
470 ///
471 /// Receives any pending data from the pipe but does not wait for new data
472 /// to arrive. On success, returns the number of bytes read. Because
473 /// `try_read_vectored()` is non-blocking, the buffer does not have to be
474 /// stored by the async task and can exist entirely on the stack.
475 ///
476 /// Usually, [`readable()`] or [`ready()`] is used with this function.
477 ///
478 /// [`try_read()`]: NamedPipeServer::try_read()
479 /// [`readable()`]: NamedPipeServer::readable()
480 /// [`ready()`]: NamedPipeServer::ready()
481 ///
482 /// # Return
483 ///
484 /// If data is successfully read, `Ok(n)` is returned, where `n` is the
485 /// number of bytes read. `Ok(0)` indicates the pipe's read half is closed
486 /// and will no longer yield data. If the pipe is not ready to read data
487 /// `Err(io::ErrorKind::WouldBlock)` is returned.
488 ///
489 /// # Examples
490 ///
491 /// ```no_run
492 /// use tokio::net::windows::named_pipe;
493 /// use std::error::Error;
494 /// use std::io::{self, IoSliceMut};
495 ///
496 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-server-try-read-vectored";
497 ///
498 /// #[tokio::main]
499 /// async fn main() -> Result<(), Box<dyn Error>> {
500 /// let server = named_pipe::ServerOptions::new()
501 /// .create(PIPE_NAME)?;
502 ///
503 /// loop {
504 /// // Wait for the pipe to be readable
505 /// server.readable().await?;
506 ///
507 /// // Creating the buffer **after** the `await` prevents it from
508 /// // being stored in the async task.
509 /// let mut buf_a = [0; 512];
510 /// let mut buf_b = [0; 1024];
511 /// let mut bufs = [
512 /// IoSliceMut::new(&mut buf_a),
513 /// IoSliceMut::new(&mut buf_b),
514 /// ];
515 ///
516 /// // Try to read data, this may still fail with `WouldBlock`
517 /// // if the readiness event is a false positive.
518 /// match server.try_read_vectored(&mut bufs) {
519 /// Ok(0) => break,
520 /// Ok(n) => {
521 /// println!("read {} bytes", n);
522 /// }
523 /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
524 /// continue;
525 /// }
526 /// Err(e) => {
527 /// return Err(e.into());
528 /// }
529 /// }
530 /// }
531 ///
532 /// Ok(())
533 /// }
534 /// ```
535 pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> {
536 self.io
537 .registration()
538 .try_io(Interest::READABLE, || (&*self.io).read_vectored(bufs))
539 }
540
541 cfg_io_util! {
542 /// Tries to read data from the stream into the provided buffer, advancing the
543 /// buffer's internal cursor, returning how many bytes were read.
544 ///
545 /// Receives any pending data from the pipe but does not wait for new data
546 /// to arrive. On success, returns the number of bytes read. Because
547 /// `try_read_buf()` is non-blocking, the buffer does not have to be stored by
548 /// the async task and can exist entirely on the stack.
549 ///
550 /// Usually, [`readable()`] or [`ready()`] is used with this function.
551 ///
552 /// [`readable()`]: NamedPipeServer::readable()
553 /// [`ready()`]: NamedPipeServer::ready()
554 ///
555 /// # Return
556 ///
557 /// If data is successfully read, `Ok(n)` is returned, where `n` is the
558 /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
559 /// and will no longer yield data. If the stream is not ready to read data
560 /// `Err(io::ErrorKind::WouldBlock)` is returned.
561 ///
562 /// # Examples
563 ///
564 /// ```no_run
565 /// use tokio::net::windows::named_pipe;
566 /// use std::error::Error;
567 /// use std::io;
568 ///
569 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-readable";
570 ///
571 /// #[tokio::main]
572 /// async fn main() -> Result<(), Box<dyn Error>> {
573 /// let server = named_pipe::ServerOptions::new().create(PIPE_NAME)?;
574 ///
575 /// loop {
576 /// // Wait for the pipe to be readable
577 /// server.readable().await?;
578 ///
579 /// let mut buf = Vec::with_capacity(4096);
580 ///
581 /// // Try to read data, this may still fail with `WouldBlock`
582 /// // if the readiness event is a false positive.
583 /// match server.try_read_buf(&mut buf) {
584 /// Ok(0) => break,
585 /// Ok(n) => {
586 /// println!("read {} bytes", n);
587 /// }
588 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
589 /// continue;
590 /// }
591 /// Err(e) => {
592 /// return Err(e.into());
593 /// }
594 /// }
595 /// }
596 ///
597 /// Ok(())
598 /// }
599 /// ```
600 pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
601 self.io.registration().try_io(Interest::READABLE, || {
602 use std::io::Read;
603
604 let dst = buf.chunk_mut();
605 let dst =
606 unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
607
608 // Safety: We trust `NamedPipeServer::read` to have filled up `n` bytes in the
609 // buffer.
610 let n = (&*self.io).read(dst)?;
611
612 unsafe {
613 buf.advance_mut(n);
614 }
615
616 Ok(n)
617 })
618 }
619 }
620
621 /// Waits for the pipe to become writable.
622 ///
623 /// This function is equivalent to `ready(Interest::WRITABLE)` and is usually
624 /// paired with `try_write()`.
625 ///
626 /// # Examples
627 ///
628 /// ```no_run
629 /// use tokio::net::windows::named_pipe;
630 /// use std::error::Error;
631 /// use std::io;
632 ///
633 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-server-writable";
634 ///
635 /// #[tokio::main]
636 /// async fn main() -> Result<(), Box<dyn Error>> {
637 /// let server = named_pipe::ServerOptions::new()
638 /// .create(PIPE_NAME)?;
639 ///
640 /// loop {
641 /// // Wait for the pipe to be writable
642 /// server.writable().await?;
643 ///
644 /// // Try to write data, this may still fail with `WouldBlock`
645 /// // if the readiness event is a false positive.
646 /// match server.try_write(b"hello world") {
647 /// Ok(n) => {
648 /// break;
649 /// }
650 /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
651 /// continue;
652 /// }
653 /// Err(e) => {
654 /// return Err(e.into());
655 /// }
656 /// }
657 /// }
658 ///
659 /// Ok(())
660 /// }
661 /// ```
662 pub async fn writable(&self) -> io::Result<()> {
663 self.ready(Interest::WRITABLE).await?;
664 Ok(())
665 }
666
667 /// Polls for write readiness.
668 ///
669 /// If the pipe is not currently ready for writing, this method will
670 /// store a clone of the `Waker` from the provided `Context`. When the pipe
671 /// becomes ready for writing, `Waker::wake` will be called on the waker.
672 ///
673 /// Note that on multiple calls to `poll_write_ready` or `poll_write`, only
674 /// the `Waker` from the `Context` passed to the most recent call is
675 /// scheduled to receive a wakeup. (However, `poll_read_ready` retains a
676 /// second, independent waker.)
677 ///
678 /// This function is intended for cases where creating and pinning a future
679 /// via [`writable`] is not feasible. Where possible, using [`writable`] is
680 /// preferred, as this supports polling from multiple tasks at once.
681 ///
682 /// # Return value
683 ///
684 /// The function returns:
685 ///
686 /// * `Poll::Pending` if the pipe is not ready for writing.
687 /// * `Poll::Ready(Ok(()))` if the pipe is ready for writing.
688 /// * `Poll::Ready(Err(e))` if an error is encountered.
689 ///
690 /// # Errors
691 ///
692 /// This function may encounter any standard I/O error except `WouldBlock`.
693 ///
694 /// [`writable`]: method@Self::writable
695 pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
696 self.io.registration().poll_write_ready(cx).map_ok(|_| ())
697 }
698
699 /// Tries to write a buffer to the pipe, returning how many bytes were
700 /// written.
701 ///
702 /// The function will attempt to write the entire contents of `buf`, but
703 /// only part of the buffer may be written.
704 ///
705 /// This function is usually paired with `writable()`.
706 ///
707 /// # Return
708 ///
709 /// If data is successfully written, `Ok(n)` is returned, where `n` is the
710 /// number of bytes written. If the pipe is not ready to write data,
711 /// `Err(io::ErrorKind::WouldBlock)` is returned.
712 ///
713 /// # Examples
714 ///
715 /// ```no_run
716 /// use tokio::net::windows::named_pipe;
717 /// use std::error::Error;
718 /// use std::io;
719 ///
720 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-server-try-write";
721 ///
722 /// #[tokio::main]
723 /// async fn main() -> Result<(), Box<dyn Error>> {
724 /// let server = named_pipe::ServerOptions::new()
725 /// .create(PIPE_NAME)?;
726 ///
727 /// loop {
728 /// // Wait for the pipe to be writable
729 /// server.writable().await?;
730 ///
731 /// // Try to write data, this may still fail with `WouldBlock`
732 /// // if the readiness event is a false positive.
733 /// match server.try_write(b"hello world") {
734 /// Ok(n) => {
735 /// break;
736 /// }
737 /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
738 /// continue;
739 /// }
740 /// Err(e) => {
741 /// return Err(e.into());
742 /// }
743 /// }
744 /// }
745 ///
746 /// Ok(())
747 /// }
748 /// ```
749 pub fn try_write(&self, buf: &[u8]) -> io::Result<usize> {
750 self.io
751 .registration()
752 .try_io(Interest::WRITABLE, || (&*self.io).write(buf))
753 }
754
755 /// Tries to write several buffers to the pipe, returning how many bytes
756 /// were written.
757 ///
758 /// Data is written from each buffer in order, with the final buffer read
759 /// from possible being only partially consumed. This method behaves
760 /// equivalently to a single call to [`try_write()`] with concatenated
761 /// buffers.
762 ///
763 /// This function is usually paired with `writable()`.
764 ///
765 /// [`try_write()`]: NamedPipeServer::try_write()
766 ///
767 /// # Return
768 ///
769 /// If data is successfully written, `Ok(n)` is returned, where `n` is the
770 /// number of bytes written. If the pipe is not ready to write data,
771 /// `Err(io::ErrorKind::WouldBlock)` is returned.
772 ///
773 /// # Examples
774 ///
775 /// ```no_run
776 /// use tokio::net::windows::named_pipe;
777 /// use std::error::Error;
778 /// use std::io;
779 ///
780 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-server-try-write-vectored";
781 ///
782 /// #[tokio::main]
783 /// async fn main() -> Result<(), Box<dyn Error>> {
784 /// let server = named_pipe::ServerOptions::new()
785 /// .create(PIPE_NAME)?;
786 ///
787 /// let bufs = [io::IoSlice::new(b"hello "), io::IoSlice::new(b"world")];
788 ///
789 /// loop {
790 /// // Wait for the pipe to be writable
791 /// server.writable().await?;
792 ///
793 /// // Try to write data, this may still fail with `WouldBlock`
794 /// // if the readiness event is a false positive.
795 /// match server.try_write_vectored(&bufs) {
796 /// Ok(n) => {
797 /// break;
798 /// }
799 /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
800 /// continue;
801 /// }
802 /// Err(e) => {
803 /// return Err(e.into());
804 /// }
805 /// }
806 /// }
807 ///
808 /// Ok(())
809 /// }
810 /// ```
811 pub fn try_write_vectored(&self, buf: &[io::IoSlice<'_>]) -> io::Result<usize> {
812 self.io
813 .registration()
814 .try_io(Interest::WRITABLE, || (&*self.io).write_vectored(buf))
815 }
816
817 /// Tries to read or write from the pipe using a user-provided IO operation.
818 ///
819 /// If the pipe is ready, the provided closure is called. The closure
820 /// should attempt to perform IO operation from the pipe by manually
821 /// calling the appropriate syscall. If the operation fails because the
822 /// pipe is not actually ready, then the closure should return a
823 /// `WouldBlock` error and the readiness flag is cleared. The return value
824 /// of the closure is then returned by `try_io`.
825 ///
826 /// If the pipe is not ready, then the closure is not called
827 /// and a `WouldBlock` error is returned.
828 ///
829 /// The closure should only return a `WouldBlock` error if it has performed
830 /// an IO operation on the pipe that failed due to the pipe not being
831 /// ready. Returning a `WouldBlock` error in any other situation will
832 /// incorrectly clear the readiness flag, which can cause the pipe to
833 /// behave incorrectly.
834 ///
835 /// The closure should not perform the IO operation using any of the
836 /// methods defined on the Tokio `NamedPipeServer` type, as this will mess with
837 /// the readiness flag and can cause the pipe to behave incorrectly.
838 ///
839 /// This method is not intended to be used with combined interests.
840 /// The closure should perform only one type of IO operation, so it should not
841 /// require more than one ready state. This method may panic or sleep forever
842 /// if it is called with a combined interest.
843 ///
844 /// Usually, [`readable()`], [`writable()`] or [`ready()`] is used with this function.
845 ///
846 /// [`readable()`]: NamedPipeServer::readable()
847 /// [`writable()`]: NamedPipeServer::writable()
848 /// [`ready()`]: NamedPipeServer::ready()
849 pub fn try_io<R>(
850 &self,
851 interest: Interest,
852 f: impl FnOnce() -> io::Result<R>,
853 ) -> io::Result<R> {
854 self.io.registration().try_io(interest, f)
855 }
856
857 /// Reads or writes from the pipe using a user-provided IO operation.
858 ///
859 /// The readiness of the pipe is awaited and when the pipe is ready,
860 /// the provided closure is called. The closure should attempt to perform
861 /// IO operation on the pipe by manually calling the appropriate syscall.
862 /// If the operation fails because the pipe is not actually ready,
863 /// then the closure should return a `WouldBlock` error. In such case the
864 /// readiness flag is cleared and the pipe readiness is awaited again.
865 /// This loop is repeated until the closure returns an `Ok` or an error
866 /// other than `WouldBlock`.
867 ///
868 /// The closure should only return a `WouldBlock` error if it has performed
869 /// an IO operation on the pipe that failed due to the pipe not being
870 /// ready. Returning a `WouldBlock` error in any other situation will
871 /// incorrectly clear the readiness flag, which can cause the pipe to
872 /// behave incorrectly.
873 ///
874 /// The closure should not perform the IO operation using any of the methods
875 /// defined on the Tokio `NamedPipeServer` type, as this will mess with the
876 /// readiness flag and can cause the pipe to behave incorrectly.
877 ///
878 /// This method is not intended to be used with combined interests.
879 /// The closure should perform only one type of IO operation, so it should not
880 /// require more than one ready state. This method may panic or sleep forever
881 /// if it is called with a combined interest.
882 pub async fn async_io<R>(
883 &self,
884 interest: Interest,
885 f: impl FnMut() -> io::Result<R>,
886 ) -> io::Result<R> {
887 self.io.registration().async_io(interest, f).await
888 }
889}
890
891impl AsyncRead for NamedPipeServer {
892 fn poll_read(
893 self: Pin<&mut Self>,
894 cx: &mut Context<'_>,
895 buf: &mut ReadBuf<'_>,
896 ) -> Poll<io::Result<()>> {
897 unsafe { self.io.poll_read(cx, buf) }
898 }
899}
900
901impl AsyncWrite for NamedPipeServer {
902 fn poll_write(
903 self: Pin<&mut Self>,
904 cx: &mut Context<'_>,
905 buf: &[u8],
906 ) -> Poll<io::Result<usize>> {
907 self.io.poll_write(cx, buf)
908 }
909
910 fn poll_write_vectored(
911 self: Pin<&mut Self>,
912 cx: &mut Context<'_>,
913 bufs: &[io::IoSlice<'_>],
914 ) -> Poll<io::Result<usize>> {
915 self.io.poll_write_vectored(cx, bufs)
916 }
917
918 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
919 Poll::Ready(Ok(()))
920 }
921
922 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
923 self.poll_flush(cx)
924 }
925}
926
927impl AsRawHandle for NamedPipeServer {
928 fn as_raw_handle(&self) -> RawHandle {
929 self.io.as_raw_handle()
930 }
931}
932
933impl AsHandle for NamedPipeServer {
934 fn as_handle(&self) -> BorrowedHandle<'_> {
935 unsafe { BorrowedHandle::borrow_raw(self.as_raw_handle()) }
936 }
937}
938
939/// A [Windows named pipe] client.
940///
941/// Constructed using [`ClientOptions::open`].
942///
943/// Connecting a client correctly involves a few steps. When connecting through
944/// [`ClientOptions::open`], it might error indicating one of two things:
945///
946/// * [`std::io::ErrorKind::NotFound`] - There is no server available.
947/// * [`ERROR_PIPE_BUSY`] - There is a server available, but it is busy. Sleep
948/// for a while and try again.
949///
950/// So a correctly implemented client looks like this:
951///
952/// ```no_run
953/// use std::time::Duration;
954/// use tokio::net::windows::named_pipe::ClientOptions;
955/// use tokio::time;
956/// use windows_sys::Win32::Foundation::ERROR_PIPE_BUSY;
957///
958/// const PIPE_NAME: &str = r"\\.\pipe\named-pipe-idiomatic-client";
959///
960/// # #[tokio::main] async fn main() -> std::io::Result<()> {
961/// let client = loop {
962/// match ClientOptions::new().open(PIPE_NAME) {
963/// Ok(client) => break client,
964/// Err(e) if e.raw_os_error() == Some(ERROR_PIPE_BUSY as i32) => (),
965/// Err(e) => return Err(e),
966/// }
967///
968/// time::sleep(Duration::from_millis(50)).await;
969/// };
970///
971/// /* use the connected client */
972/// # Ok(()) }
973/// ```
974///
975/// [`ERROR_PIPE_BUSY`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/Foundation/constant.ERROR_PIPE_BUSY.html
976/// [Windows named pipe]: https://docs.microsoft.com/en-us/windows/win32/ipc/named-pipes
977#[derive(Debug)]
978pub struct NamedPipeClient {
979 io: PollEvented<mio_windows::NamedPipe>,
980}
981
982impl NamedPipeClient {
983 /// Constructs a new named pipe client from the specified raw handle.
984 ///
985 /// This function will consume ownership of the handle given, passing
986 /// responsibility for closing the handle to the returned object.
987 ///
988 /// This function is also unsafe as the primitives currently returned have
989 /// the contract that they are the sole owner of the file descriptor they
990 /// are wrapping. Usage of this function could accidentally allow violating
991 /// this contract which can cause memory unsafety in code that relies on it
992 /// being true.
993 ///
994 /// # Errors
995 ///
996 /// This errors if called outside of a [Tokio Runtime], or in a runtime that
997 /// has not [enabled I/O], or if any OS-specific I/O errors occur.
998 ///
999 /// [Tokio Runtime]: crate::runtime::Runtime
1000 /// [enabled I/O]: crate::runtime::Builder::enable_io
1001 pub unsafe fn from_raw_handle(handle: RawHandle) -> io::Result<Self> {
1002 let named_pipe = mio_windows::NamedPipe::from_raw_handle(handle);
1003
1004 Ok(Self {
1005 io: PollEvented::new(named_pipe)?,
1006 })
1007 }
1008
1009 /// Retrieves information about the named pipe the client is associated
1010 /// with.
1011 ///
1012 /// ```no_run
1013 /// use tokio::net::windows::named_pipe::{ClientOptions, PipeEnd, PipeMode};
1014 ///
1015 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-info";
1016 ///
1017 /// # #[tokio::main] async fn main() -> std::io::Result<()> {
1018 /// let client = ClientOptions::new()
1019 /// .open(PIPE_NAME)?;
1020 ///
1021 /// let client_info = client.info()?;
1022 ///
1023 /// assert_eq!(client_info.end, PipeEnd::Client);
1024 /// assert_eq!(client_info.mode, PipeMode::Message);
1025 /// assert_eq!(client_info.max_instances, 5);
1026 /// # Ok(()) }
1027 /// ```
1028 pub fn info(&self) -> io::Result<PipeInfo> {
1029 // Safety: we're ensuring the lifetime of the named pipe.
1030 unsafe { named_pipe_info(self.io.as_raw_handle()) }
1031 }
1032
1033 /// Waits for any of the requested ready states.
1034 ///
1035 /// This function is usually paired with `try_read()` or `try_write()`. It
1036 /// can be used to concurrently read / write to the same pipe on a single
1037 /// task without splitting the pipe.
1038 ///
1039 /// The function may complete without the pipe being ready. This is a
1040 /// false-positive and attempting an operation will return with
1041 /// `io::ErrorKind::WouldBlock`. The function can also return with an empty
1042 /// [`Ready`] set, so you should always check the returned value and possibly
1043 /// wait again if the requested states are not set.
1044 ///
1045 /// # Examples
1046 ///
1047 /// Concurrently read and write to the pipe on the same task without
1048 /// splitting.
1049 ///
1050 /// ```no_run
1051 /// use tokio::io::Interest;
1052 /// use tokio::net::windows::named_pipe;
1053 /// use std::error::Error;
1054 /// use std::io;
1055 ///
1056 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-ready";
1057 ///
1058 /// #[tokio::main]
1059 /// async fn main() -> Result<(), Box<dyn Error>> {
1060 /// let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?;
1061 ///
1062 /// loop {
1063 /// let ready = client.ready(Interest::READABLE | Interest::WRITABLE).await?;
1064 ///
1065 /// if ready.is_readable() {
1066 /// let mut data = vec![0; 1024];
1067 /// // Try to read data, this may still fail with `WouldBlock`
1068 /// // if the readiness event is a false positive.
1069 /// match client.try_read(&mut data) {
1070 /// Ok(n) => {
1071 /// println!("read {} bytes", n);
1072 /// }
1073 /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
1074 /// continue;
1075 /// }
1076 /// Err(e) => {
1077 /// return Err(e.into());
1078 /// }
1079 /// }
1080 /// }
1081 ///
1082 /// if ready.is_writable() {
1083 /// // Try to write data, this may still fail with `WouldBlock`
1084 /// // if the readiness event is a false positive.
1085 /// match client.try_write(b"hello world") {
1086 /// Ok(n) => {
1087 /// println!("write {} bytes", n);
1088 /// }
1089 /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
1090 /// continue;
1091 /// }
1092 /// Err(e) => {
1093 /// return Err(e.into());
1094 /// }
1095 /// }
1096 /// }
1097 /// }
1098 /// }
1099 /// ```
1100 pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
1101 let event = self.io.registration().readiness(interest).await?;
1102 Ok(event.ready)
1103 }
1104
1105 /// Waits for the pipe to become readable.
1106 ///
1107 /// This function is equivalent to `ready(Interest::READABLE)` and is usually
1108 /// paired with `try_read()`.
1109 ///
1110 /// # Examples
1111 ///
1112 /// ```no_run
1113 /// use tokio::net::windows::named_pipe;
1114 /// use std::error::Error;
1115 /// use std::io;
1116 ///
1117 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-readable";
1118 ///
1119 /// #[tokio::main]
1120 /// async fn main() -> Result<(), Box<dyn Error>> {
1121 /// let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?;
1122 ///
1123 /// let mut msg = vec![0; 1024];
1124 ///
1125 /// loop {
1126 /// // Wait for the pipe to be readable
1127 /// client.readable().await?;
1128 ///
1129 /// // Try to read data, this may still fail with `WouldBlock`
1130 /// // if the readiness event is a false positive.
1131 /// match client.try_read(&mut msg) {
1132 /// Ok(n) => {
1133 /// msg.truncate(n);
1134 /// break;
1135 /// }
1136 /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
1137 /// continue;
1138 /// }
1139 /// Err(e) => {
1140 /// return Err(e.into());
1141 /// }
1142 /// }
1143 /// }
1144 ///
1145 /// println!("GOT = {:?}", msg);
1146 /// Ok(())
1147 /// }
1148 /// ```
1149 pub async fn readable(&self) -> io::Result<()> {
1150 self.ready(Interest::READABLE).await?;
1151 Ok(())
1152 }
1153
1154 /// Polls for read readiness.
1155 ///
1156 /// If the pipe is not currently ready for reading, this method will
1157 /// store a clone of the `Waker` from the provided `Context`. When the pipe
1158 /// becomes ready for reading, `Waker::wake` will be called on the waker.
1159 ///
1160 /// Note that on multiple calls to `poll_read_ready` or `poll_read`, only
1161 /// the `Waker` from the `Context` passed to the most recent call is
1162 /// scheduled to receive a wakeup. (However, `poll_write_ready` retains a
1163 /// second, independent waker.)
1164 ///
1165 /// This function is intended for cases where creating and pinning a future
1166 /// via [`readable`] is not feasible. Where possible, using [`readable`] is
1167 /// preferred, as this supports polling from multiple tasks at once.
1168 ///
1169 /// # Return value
1170 ///
1171 /// The function returns:
1172 ///
1173 /// * `Poll::Pending` if the pipe is not ready for reading.
1174 /// * `Poll::Ready(Ok(()))` if the pipe is ready for reading.
1175 /// * `Poll::Ready(Err(e))` if an error is encountered.
1176 ///
1177 /// # Errors
1178 ///
1179 /// This function may encounter any standard I/O error except `WouldBlock`.
1180 ///
1181 /// [`readable`]: method@Self::readable
1182 pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1183 self.io.registration().poll_read_ready(cx).map_ok(|_| ())
1184 }
1185
1186 /// Tries to read data from the pipe into the provided buffer, returning how
1187 /// many bytes were read.
1188 ///
1189 /// Receives any pending data from the pipe but does not wait for new data
1190 /// to arrive. On success, returns the number of bytes read. Because
1191 /// `try_read()` is non-blocking, the buffer does not have to be stored by
1192 /// the async task and can exist entirely on the stack.
1193 ///
1194 /// Usually, [`readable()`] or [`ready()`] is used with this function.
1195 ///
1196 /// [`readable()`]: NamedPipeClient::readable()
1197 /// [`ready()`]: NamedPipeClient::ready()
1198 ///
1199 /// # Return
1200 ///
1201 /// If data is successfully read, `Ok(n)` is returned, where `n` is the
1202 /// number of bytes read. If `n` is `0`, then it can indicate one of two scenarios:
1203 ///
1204 /// 1. The pipe's read half is closed and will no longer yield data.
1205 /// 2. The specified buffer was 0 bytes in length.
1206 ///
1207 /// If the pipe is not ready to read data,
1208 /// `Err(io::ErrorKind::WouldBlock)` is returned.
1209 ///
1210 /// # Examples
1211 ///
1212 /// ```no_run
1213 /// use tokio::net::windows::named_pipe;
1214 /// use std::error::Error;
1215 /// use std::io;
1216 ///
1217 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-try-read";
1218 ///
1219 /// #[tokio::main]
1220 /// async fn main() -> Result<(), Box<dyn Error>> {
1221 /// let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?;
1222 ///
1223 /// loop {
1224 /// // Wait for the pipe to be readable
1225 /// client.readable().await?;
1226 ///
1227 /// // Creating the buffer **after** the `await` prevents it from
1228 /// // being stored in the async task.
1229 /// let mut buf = [0; 4096];
1230 ///
1231 /// // Try to read data, this may still fail with `WouldBlock`
1232 /// // if the readiness event is a false positive.
1233 /// match client.try_read(&mut buf) {
1234 /// Ok(0) => break,
1235 /// Ok(n) => {
1236 /// println!("read {} bytes", n);
1237 /// }
1238 /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
1239 /// continue;
1240 /// }
1241 /// Err(e) => {
1242 /// return Err(e.into());
1243 /// }
1244 /// }
1245 /// }
1246 ///
1247 /// Ok(())
1248 /// }
1249 /// ```
1250 pub fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> {
1251 self.io
1252 .registration()
1253 .try_io(Interest::READABLE, || (&*self.io).read(buf))
1254 }
1255
1256 /// Tries to read data from the pipe into the provided buffers, returning
1257 /// how many bytes were read.
1258 ///
1259 /// Data is copied to fill each buffer in order, with the final buffer
1260 /// written to possibly being only partially filled. This method behaves
1261 /// equivalently to a single call to [`try_read()`] with concatenated
1262 /// buffers.
1263 ///
1264 /// Receives any pending data from the pipe but does not wait for new data
1265 /// to arrive. On success, returns the number of bytes read. Because
1266 /// `try_read_vectored()` is non-blocking, the buffer does not have to be
1267 /// stored by the async task and can exist entirely on the stack.
1268 ///
1269 /// Usually, [`readable()`] or [`ready()`] is used with this function.
1270 ///
1271 /// [`try_read()`]: NamedPipeClient::try_read()
1272 /// [`readable()`]: NamedPipeClient::readable()
1273 /// [`ready()`]: NamedPipeClient::ready()
1274 ///
1275 /// # Return
1276 ///
1277 /// If data is successfully read, `Ok(n)` is returned, where `n` is the
1278 /// number of bytes read. `Ok(0)` indicates the pipe's read half is closed
1279 /// and will no longer yield data. If the pipe is not ready to read data
1280 /// `Err(io::ErrorKind::WouldBlock)` is returned.
1281 ///
1282 /// # Examples
1283 ///
1284 /// ```no_run
1285 /// use tokio::net::windows::named_pipe;
1286 /// use std::error::Error;
1287 /// use std::io::{self, IoSliceMut};
1288 ///
1289 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-try-read-vectored";
1290 ///
1291 /// #[tokio::main]
1292 /// async fn main() -> Result<(), Box<dyn Error>> {
1293 /// let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?;
1294 ///
1295 /// loop {
1296 /// // Wait for the pipe to be readable
1297 /// client.readable().await?;
1298 ///
1299 /// // Creating the buffer **after** the `await` prevents it from
1300 /// // being stored in the async task.
1301 /// let mut buf_a = [0; 512];
1302 /// let mut buf_b = [0; 1024];
1303 /// let mut bufs = [
1304 /// IoSliceMut::new(&mut buf_a),
1305 /// IoSliceMut::new(&mut buf_b),
1306 /// ];
1307 ///
1308 /// // Try to read data, this may still fail with `WouldBlock`
1309 /// // if the readiness event is a false positive.
1310 /// match client.try_read_vectored(&mut bufs) {
1311 /// Ok(0) => break,
1312 /// Ok(n) => {
1313 /// println!("read {} bytes", n);
1314 /// }
1315 /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
1316 /// continue;
1317 /// }
1318 /// Err(e) => {
1319 /// return Err(e.into());
1320 /// }
1321 /// }
1322 /// }
1323 ///
1324 /// Ok(())
1325 /// }
1326 /// ```
1327 pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> {
1328 self.io
1329 .registration()
1330 .try_io(Interest::READABLE, || (&*self.io).read_vectored(bufs))
1331 }
1332
1333 cfg_io_util! {
1334 /// Tries to read data from the stream into the provided buffer, advancing the
1335 /// buffer's internal cursor, returning how many bytes were read.
1336 ///
1337 /// Receives any pending data from the pipe but does not wait for new data
1338 /// to arrive. On success, returns the number of bytes read. Because
1339 /// `try_read_buf()` is non-blocking, the buffer does not have to be stored by
1340 /// the async task and can exist entirely on the stack.
1341 ///
1342 /// Usually, [`readable()`] or [`ready()`] is used with this function.
1343 ///
1344 /// [`readable()`]: NamedPipeClient::readable()
1345 /// [`ready()`]: NamedPipeClient::ready()
1346 ///
1347 /// # Return
1348 ///
1349 /// If data is successfully read, `Ok(n)` is returned, where `n` is the
1350 /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
1351 /// and will no longer yield data. If the stream is not ready to read data
1352 /// `Err(io::ErrorKind::WouldBlock)` is returned.
1353 ///
1354 /// # Examples
1355 ///
1356 /// ```no_run
1357 /// use tokio::net::windows::named_pipe;
1358 /// use std::error::Error;
1359 /// use std::io;
1360 ///
1361 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-readable";
1362 ///
1363 /// #[tokio::main]
1364 /// async fn main() -> Result<(), Box<dyn Error>> {
1365 /// let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?;
1366 ///
1367 /// loop {
1368 /// // Wait for the pipe to be readable
1369 /// client.readable().await?;
1370 ///
1371 /// let mut buf = Vec::with_capacity(4096);
1372 ///
1373 /// // Try to read data, this may still fail with `WouldBlock`
1374 /// // if the readiness event is a false positive.
1375 /// match client.try_read_buf(&mut buf) {
1376 /// Ok(0) => break,
1377 /// Ok(n) => {
1378 /// println!("read {} bytes", n);
1379 /// }
1380 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
1381 /// continue;
1382 /// }
1383 /// Err(e) => {
1384 /// return Err(e.into());
1385 /// }
1386 /// }
1387 /// }
1388 ///
1389 /// Ok(())
1390 /// }
1391 /// ```
1392 pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
1393 self.io.registration().try_io(Interest::READABLE, || {
1394 use std::io::Read;
1395
1396 let dst = buf.chunk_mut();
1397 let dst =
1398 unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
1399
1400 // Safety: We trust `NamedPipeClient::read` to have filled up `n` bytes in the
1401 // buffer.
1402 let n = (&*self.io).read(dst)?;
1403
1404 unsafe {
1405 buf.advance_mut(n);
1406 }
1407
1408 Ok(n)
1409 })
1410 }
1411 }
1412
1413 /// Waits for the pipe to become writable.
1414 ///
1415 /// This function is equivalent to `ready(Interest::WRITABLE)` and is usually
1416 /// paired with `try_write()`.
1417 ///
1418 /// # Examples
1419 ///
1420 /// ```no_run
1421 /// use tokio::net::windows::named_pipe;
1422 /// use std::error::Error;
1423 /// use std::io;
1424 ///
1425 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-writable";
1426 ///
1427 /// #[tokio::main]
1428 /// async fn main() -> Result<(), Box<dyn Error>> {
1429 /// let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?;
1430 ///
1431 /// loop {
1432 /// // Wait for the pipe to be writable
1433 /// client.writable().await?;
1434 ///
1435 /// // Try to write data, this may still fail with `WouldBlock`
1436 /// // if the readiness event is a false positive.
1437 /// match client.try_write(b"hello world") {
1438 /// Ok(n) => {
1439 /// break;
1440 /// }
1441 /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
1442 /// continue;
1443 /// }
1444 /// Err(e) => {
1445 /// return Err(e.into());
1446 /// }
1447 /// }
1448 /// }
1449 ///
1450 /// Ok(())
1451 /// }
1452 /// ```
1453 pub async fn writable(&self) -> io::Result<()> {
1454 self.ready(Interest::WRITABLE).await?;
1455 Ok(())
1456 }
1457
1458 /// Polls for write readiness.
1459 ///
1460 /// If the pipe is not currently ready for writing, this method will
1461 /// store a clone of the `Waker` from the provided `Context`. When the pipe
1462 /// becomes ready for writing, `Waker::wake` will be called on the waker.
1463 ///
1464 /// Note that on multiple calls to `poll_write_ready` or `poll_write`, only
1465 /// the `Waker` from the `Context` passed to the most recent call is
1466 /// scheduled to receive a wakeup. (However, `poll_read_ready` retains a
1467 /// second, independent waker.)
1468 ///
1469 /// This function is intended for cases where creating and pinning a future
1470 /// via [`writable`] is not feasible. Where possible, using [`writable`] is
1471 /// preferred, as this supports polling from multiple tasks at once.
1472 ///
1473 /// # Return value
1474 ///
1475 /// The function returns:
1476 ///
1477 /// * `Poll::Pending` if the pipe is not ready for writing.
1478 /// * `Poll::Ready(Ok(()))` if the pipe is ready for writing.
1479 /// * `Poll::Ready(Err(e))` if an error is encountered.
1480 ///
1481 /// # Errors
1482 ///
1483 /// This function may encounter any standard I/O error except `WouldBlock`.
1484 ///
1485 /// [`writable`]: method@Self::writable
1486 pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1487 self.io.registration().poll_write_ready(cx).map_ok(|_| ())
1488 }
1489
1490 /// Tries to write a buffer to the pipe, returning how many bytes were
1491 /// written.
1492 ///
1493 /// The function will attempt to write the entire contents of `buf`, but
1494 /// only part of the buffer may be written.
1495 ///
1496 /// This function is usually paired with `writable()`.
1497 ///
1498 /// # Return
1499 ///
1500 /// If data is successfully written, `Ok(n)` is returned, where `n` is the
1501 /// number of bytes written. If the pipe is not ready to write data,
1502 /// `Err(io::ErrorKind::WouldBlock)` is returned.
1503 ///
1504 /// # Examples
1505 ///
1506 /// ```no_run
1507 /// use tokio::net::windows::named_pipe;
1508 /// use std::error::Error;
1509 /// use std::io;
1510 ///
1511 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-try-write";
1512 ///
1513 /// #[tokio::main]
1514 /// async fn main() -> Result<(), Box<dyn Error>> {
1515 /// let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?;
1516 ///
1517 /// loop {
1518 /// // Wait for the pipe to be writable
1519 /// client.writable().await?;
1520 ///
1521 /// // Try to write data, this may still fail with `WouldBlock`
1522 /// // if the readiness event is a false positive.
1523 /// match client.try_write(b"hello world") {
1524 /// Ok(n) => {
1525 /// break;
1526 /// }
1527 /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
1528 /// continue;
1529 /// }
1530 /// Err(e) => {
1531 /// return Err(e.into());
1532 /// }
1533 /// }
1534 /// }
1535 ///
1536 /// Ok(())
1537 /// }
1538 /// ```
1539 pub fn try_write(&self, buf: &[u8]) -> io::Result<usize> {
1540 self.io
1541 .registration()
1542 .try_io(Interest::WRITABLE, || (&*self.io).write(buf))
1543 }
1544
1545 /// Tries to write several buffers to the pipe, returning how many bytes
1546 /// were written.
1547 ///
1548 /// Data is written from each buffer in order, with the final buffer read
1549 /// from possible being only partially consumed. This method behaves
1550 /// equivalently to a single call to [`try_write()`] with concatenated
1551 /// buffers.
1552 ///
1553 /// This function is usually paired with `writable()`.
1554 ///
1555 /// [`try_write()`]: NamedPipeClient::try_write()
1556 ///
1557 /// # Return
1558 ///
1559 /// If data is successfully written, `Ok(n)` is returned, where `n` is the
1560 /// number of bytes written. If the pipe is not ready to write data,
1561 /// `Err(io::ErrorKind::WouldBlock)` is returned.
1562 ///
1563 /// # Examples
1564 ///
1565 /// ```no_run
1566 /// use tokio::net::windows::named_pipe;
1567 /// use std::error::Error;
1568 /// use std::io;
1569 ///
1570 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-try-write-vectored";
1571 ///
1572 /// #[tokio::main]
1573 /// async fn main() -> Result<(), Box<dyn Error>> {
1574 /// let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?;
1575 ///
1576 /// let bufs = [io::IoSlice::new(b"hello "), io::IoSlice::new(b"world")];
1577 ///
1578 /// loop {
1579 /// // Wait for the pipe to be writable
1580 /// client.writable().await?;
1581 ///
1582 /// // Try to write data, this may still fail with `WouldBlock`
1583 /// // if the readiness event is a false positive.
1584 /// match client.try_write_vectored(&bufs) {
1585 /// Ok(n) => {
1586 /// break;
1587 /// }
1588 /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
1589 /// continue;
1590 /// }
1591 /// Err(e) => {
1592 /// return Err(e.into());
1593 /// }
1594 /// }
1595 /// }
1596 ///
1597 /// Ok(())
1598 /// }
1599 /// ```
1600 pub fn try_write_vectored(&self, buf: &[io::IoSlice<'_>]) -> io::Result<usize> {
1601 self.io
1602 .registration()
1603 .try_io(Interest::WRITABLE, || (&*self.io).write_vectored(buf))
1604 }
1605
1606 /// Tries to read or write from the pipe using a user-provided IO operation.
1607 ///
1608 /// If the pipe is ready, the provided closure is called. The closure
1609 /// should attempt to perform IO operation from the pipe by manually
1610 /// calling the appropriate syscall. If the operation fails because the
1611 /// pipe is not actually ready, then the closure should return a
1612 /// `WouldBlock` error and the readiness flag is cleared. The return value
1613 /// of the closure is then returned by `try_io`.
1614 ///
1615 /// If the pipe is not ready, then the closure is not called
1616 /// and a `WouldBlock` error is returned.
1617 ///
1618 /// The closure should only return a `WouldBlock` error if it has performed
1619 /// an IO operation on the pipe that failed due to the pipe not being
1620 /// ready. Returning a `WouldBlock` error in any other situation will
1621 /// incorrectly clear the readiness flag, which can cause the pipe to
1622 /// behave incorrectly.
1623 ///
1624 /// The closure should not perform the IO operation using any of the methods
1625 /// defined on the Tokio `NamedPipeClient` type, as this will mess with the
1626 /// readiness flag and can cause the pipe to behave incorrectly.
1627 ///
1628 /// This method is not intended to be used with combined interests.
1629 /// The closure should perform only one type of IO operation, so it should not
1630 /// require more than one ready state. This method may panic or sleep forever
1631 /// if it is called with a combined interest.
1632 ///
1633 /// Usually, [`readable()`], [`writable()`] or [`ready()`] is used with this function.
1634 ///
1635 /// [`readable()`]: NamedPipeClient::readable()
1636 /// [`writable()`]: NamedPipeClient::writable()
1637 /// [`ready()`]: NamedPipeClient::ready()
1638 pub fn try_io<R>(
1639 &self,
1640 interest: Interest,
1641 f: impl FnOnce() -> io::Result<R>,
1642 ) -> io::Result<R> {
1643 self.io.registration().try_io(interest, f)
1644 }
1645
1646 /// Reads or writes from the pipe using a user-provided IO operation.
1647 ///
1648 /// The readiness of the pipe is awaited and when the pipe is ready,
1649 /// the provided closure is called. The closure should attempt to perform
1650 /// IO operation on the pipe by manually calling the appropriate syscall.
1651 /// If the operation fails because the pipe is not actually ready,
1652 /// then the closure should return a `WouldBlock` error. In such case the
1653 /// readiness flag is cleared and the pipe readiness is awaited again.
1654 /// This loop is repeated until the closure returns an `Ok` or an error
1655 /// other than `WouldBlock`.
1656 ///
1657 /// The closure should only return a `WouldBlock` error if it has performed
1658 /// an IO operation on the pipe that failed due to the pipe not being
1659 /// ready. Returning a `WouldBlock` error in any other situation will
1660 /// incorrectly clear the readiness flag, which can cause the pipe to
1661 /// behave incorrectly.
1662 ///
1663 /// The closure should not perform the IO operation using any of the methods
1664 /// defined on the Tokio `NamedPipeClient` type, as this will mess with the
1665 /// readiness flag and can cause the pipe to behave incorrectly.
1666 ///
1667 /// This method is not intended to be used with combined interests.
1668 /// The closure should perform only one type of IO operation, so it should not
1669 /// require more than one ready state. This method may panic or sleep forever
1670 /// if it is called with a combined interest.
1671 pub async fn async_io<R>(
1672 &self,
1673 interest: Interest,
1674 f: impl FnMut() -> io::Result<R>,
1675 ) -> io::Result<R> {
1676 self.io.registration().async_io(interest, f).await
1677 }
1678}
1679
1680impl AsyncRead for NamedPipeClient {
1681 fn poll_read(
1682 self: Pin<&mut Self>,
1683 cx: &mut Context<'_>,
1684 buf: &mut ReadBuf<'_>,
1685 ) -> Poll<io::Result<()>> {
1686 unsafe { self.io.poll_read(cx, buf) }
1687 }
1688}
1689
1690impl AsyncWrite for NamedPipeClient {
1691 fn poll_write(
1692 self: Pin<&mut Self>,
1693 cx: &mut Context<'_>,
1694 buf: &[u8],
1695 ) -> Poll<io::Result<usize>> {
1696 self.io.poll_write(cx, buf)
1697 }
1698
1699 fn poll_write_vectored(
1700 self: Pin<&mut Self>,
1701 cx: &mut Context<'_>,
1702 bufs: &[io::IoSlice<'_>],
1703 ) -> Poll<io::Result<usize>> {
1704 self.io.poll_write_vectored(cx, bufs)
1705 }
1706
1707 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1708 Poll::Ready(Ok(()))
1709 }
1710
1711 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1712 self.poll_flush(cx)
1713 }
1714}
1715
1716impl AsRawHandle for NamedPipeClient {
1717 fn as_raw_handle(&self) -> RawHandle {
1718 self.io.as_raw_handle()
1719 }
1720}
1721
1722impl AsHandle for NamedPipeClient {
1723 fn as_handle(&self) -> BorrowedHandle<'_> {
1724 unsafe { BorrowedHandle::borrow_raw(self.as_raw_handle()) }
1725 }
1726}
1727
1728/// A builder structure for construct a named pipe with named pipe-specific
1729/// options. This is required to use for named pipe servers who wants to modify
1730/// pipe-related options.
1731///
1732/// See [`ServerOptions::create`].
1733#[derive(Debug, Clone)]
1734pub struct ServerOptions {
1735 // dwOpenMode
1736 access_inbound: bool,
1737 access_outbound: bool,
1738 first_pipe_instance: bool,
1739 write_dac: bool,
1740 write_owner: bool,
1741 access_system_security: bool,
1742 // dwPipeMode
1743 pipe_mode: PipeMode,
1744 reject_remote_clients: bool,
1745 // other options
1746 max_instances: u32,
1747 out_buffer_size: u32,
1748 in_buffer_size: u32,
1749 default_timeout: u32,
1750}
1751
1752impl ServerOptions {
1753 /// Creates a new named pipe builder with the default settings.
1754 ///
1755 /// ```
1756 /// use tokio::net::windows::named_pipe::ServerOptions;
1757 ///
1758 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-new";
1759 ///
1760 /// # #[tokio::main] async fn main() -> std::io::Result<()> {
1761 /// let server = ServerOptions::new().create(PIPE_NAME)?;
1762 /// # Ok(()) }
1763 /// ```
1764 pub fn new() -> ServerOptions {
1765 ServerOptions {
1766 access_inbound: true,
1767 access_outbound: true,
1768 first_pipe_instance: false,
1769 write_dac: false,
1770 write_owner: false,
1771 access_system_security: false,
1772 pipe_mode: PipeMode::Byte,
1773 reject_remote_clients: true,
1774 max_instances: windows_sys::PIPE_UNLIMITED_INSTANCES,
1775 out_buffer_size: 65536,
1776 in_buffer_size: 65536,
1777 default_timeout: 0,
1778 }
1779 }
1780
1781 /// The pipe mode.
1782 ///
1783 /// The default pipe mode is [`PipeMode::Byte`]. See [`PipeMode`] for
1784 /// documentation of what each mode means.
1785 ///
1786 /// This corresponds to specifying `PIPE_TYPE_` and `PIPE_READMODE_` in [`dwPipeMode`].
1787 ///
1788 /// [`dwPipeMode`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea
1789 pub fn pipe_mode(&mut self, pipe_mode: PipeMode) -> &mut Self {
1790 self.pipe_mode = pipe_mode;
1791 self
1792 }
1793
1794 /// The flow of data in the pipe goes from client to server only.
1795 ///
1796 /// This corresponds to setting [`PIPE_ACCESS_INBOUND`].
1797 ///
1798 /// [`PIPE_ACCESS_INBOUND`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea#pipe_access_inbound
1799 ///
1800 /// # Errors
1801 ///
1802 /// Server side prevents connecting by denying inbound access, client errors
1803 /// with [`std::io::ErrorKind::PermissionDenied`] when attempting to create
1804 /// the connection.
1805 ///
1806 /// ```
1807 /// use std::io;
1808 /// use tokio::net::windows::named_pipe::{ClientOptions, ServerOptions};
1809 ///
1810 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-access-inbound-err1";
1811 ///
1812 /// # #[tokio::main] async fn main() -> io::Result<()> {
1813 /// let _server = ServerOptions::new()
1814 /// .access_inbound(false)
1815 /// .create(PIPE_NAME)?;
1816 ///
1817 /// let e = ClientOptions::new()
1818 /// .open(PIPE_NAME)
1819 /// .unwrap_err();
1820 ///
1821 /// assert_eq!(e.kind(), io::ErrorKind::PermissionDenied);
1822 /// # Ok(()) }
1823 /// ```
1824 ///
1825 /// Disabling writing allows a client to connect, but errors with
1826 /// [`std::io::ErrorKind::PermissionDenied`] if a write is attempted.
1827 ///
1828 /// ```
1829 /// use std::io;
1830 /// use tokio::io::AsyncWriteExt;
1831 /// use tokio::net::windows::named_pipe::{ClientOptions, ServerOptions};
1832 ///
1833 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-access-inbound-err2";
1834 ///
1835 /// # #[tokio::main] async fn main() -> io::Result<()> {
1836 /// let server = ServerOptions::new()
1837 /// .access_inbound(false)
1838 /// .create(PIPE_NAME)?;
1839 ///
1840 /// let mut client = ClientOptions::new()
1841 /// .write(false)
1842 /// .open(PIPE_NAME)?;
1843 ///
1844 /// server.connect().await?;
1845 ///
1846 /// let e = client.write(b"ping").await.unwrap_err();
1847 /// assert_eq!(e.kind(), io::ErrorKind::PermissionDenied);
1848 /// # Ok(()) }
1849 /// ```
1850 ///
1851 /// # Examples
1852 ///
1853 /// A unidirectional named pipe that only supports server-to-client
1854 /// communication.
1855 ///
1856 /// ```
1857 /// use std::io;
1858 /// use tokio::io::{AsyncReadExt, AsyncWriteExt};
1859 /// use tokio::net::windows::named_pipe::{ClientOptions, ServerOptions};
1860 ///
1861 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-access-inbound";
1862 ///
1863 /// # #[tokio::main] async fn main() -> io::Result<()> {
1864 /// let mut server = ServerOptions::new()
1865 /// .access_inbound(false)
1866 /// .create(PIPE_NAME)?;
1867 ///
1868 /// let mut client = ClientOptions::new()
1869 /// .write(false)
1870 /// .open(PIPE_NAME)?;
1871 ///
1872 /// server.connect().await?;
1873 ///
1874 /// let write = server.write_all(b"ping");
1875 ///
1876 /// let mut buf = [0u8; 4];
1877 /// let read = client.read_exact(&mut buf);
1878 ///
1879 /// let ((), read) = tokio::try_join!(write, read)?;
1880 ///
1881 /// assert_eq!(read, 4);
1882 /// assert_eq!(&buf[..], b"ping");
1883 /// # Ok(()) }
1884 /// ```
1885 pub fn access_inbound(&mut self, allowed: bool) -> &mut Self {
1886 self.access_inbound = allowed;
1887 self
1888 }
1889
1890 /// The flow of data in the pipe goes from server to client only.
1891 ///
1892 /// This corresponds to setting [`PIPE_ACCESS_OUTBOUND`].
1893 ///
1894 /// [`PIPE_ACCESS_OUTBOUND`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea#pipe_access_outbound
1895 ///
1896 /// # Errors
1897 ///
1898 /// Server side prevents connecting by denying outbound access, client
1899 /// errors with [`std::io::ErrorKind::PermissionDenied`] when attempting to
1900 /// create the connection.
1901 ///
1902 /// ```
1903 /// use std::io;
1904 /// use tokio::net::windows::named_pipe::{ClientOptions, ServerOptions};
1905 ///
1906 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-access-outbound-err1";
1907 ///
1908 /// # #[tokio::main] async fn main() -> io::Result<()> {
1909 /// let server = ServerOptions::new()
1910 /// .access_outbound(false)
1911 /// .create(PIPE_NAME)?;
1912 ///
1913 /// let e = ClientOptions::new()
1914 /// .open(PIPE_NAME)
1915 /// .unwrap_err();
1916 ///
1917 /// assert_eq!(e.kind(), io::ErrorKind::PermissionDenied);
1918 /// # Ok(()) }
1919 /// ```
1920 ///
1921 /// Disabling reading allows a client to connect, but attempting to read
1922 /// will error with [`std::io::ErrorKind::PermissionDenied`].
1923 ///
1924 /// ```
1925 /// use std::io;
1926 /// use tokio::io::AsyncReadExt;
1927 /// use tokio::net::windows::named_pipe::{ClientOptions, ServerOptions};
1928 ///
1929 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-access-outbound-err2";
1930 ///
1931 /// # #[tokio::main] async fn main() -> io::Result<()> {
1932 /// let server = ServerOptions::new()
1933 /// .access_outbound(false)
1934 /// .create(PIPE_NAME)?;
1935 ///
1936 /// let mut client = ClientOptions::new()
1937 /// .read(false)
1938 /// .open(PIPE_NAME)?;
1939 ///
1940 /// server.connect().await?;
1941 ///
1942 /// let mut buf = [0u8; 4];
1943 /// let e = client.read(&mut buf).await.unwrap_err();
1944 /// assert_eq!(e.kind(), io::ErrorKind::PermissionDenied);
1945 /// # Ok(()) }
1946 /// ```
1947 ///
1948 /// # Examples
1949 ///
1950 /// A unidirectional named pipe that only supports client-to-server
1951 /// communication.
1952 ///
1953 /// ```
1954 /// use tokio::io::{AsyncReadExt, AsyncWriteExt};
1955 /// use tokio::net::windows::named_pipe::{ClientOptions, ServerOptions};
1956 ///
1957 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-access-outbound";
1958 ///
1959 /// # #[tokio::main] async fn main() -> std::io::Result<()> {
1960 /// let mut server = ServerOptions::new()
1961 /// .access_outbound(false)
1962 /// .create(PIPE_NAME)?;
1963 ///
1964 /// let mut client = ClientOptions::new()
1965 /// .read(false)
1966 /// .open(PIPE_NAME)?;
1967 ///
1968 /// server.connect().await?;
1969 ///
1970 /// let write = client.write_all(b"ping");
1971 ///
1972 /// let mut buf = [0u8; 4];
1973 /// let read = server.read_exact(&mut buf);
1974 ///
1975 /// let ((), read) = tokio::try_join!(write, read)?;
1976 ///
1977 /// println!("done reading and writing");
1978 ///
1979 /// assert_eq!(read, 4);
1980 /// assert_eq!(&buf[..], b"ping");
1981 /// # Ok(()) }
1982 /// ```
1983 pub fn access_outbound(&mut self, allowed: bool) -> &mut Self {
1984 self.access_outbound = allowed;
1985 self
1986 }
1987
1988 /// If you attempt to create multiple instances of a pipe with this flag
1989 /// set, creation of the first server instance succeeds, but creation of any
1990 /// subsequent instances will fail with
1991 /// [`std::io::ErrorKind::PermissionDenied`].
1992 ///
1993 /// This option is intended to be used with servers that want to ensure that
1994 /// they are the only process listening for clients on a given named pipe.
1995 /// This is accomplished by enabling it for the first server instance
1996 /// created in a process.
1997 ///
1998 /// This corresponds to setting [`FILE_FLAG_FIRST_PIPE_INSTANCE`].
1999 ///
2000 /// # Errors
2001 ///
2002 /// If this option is set and more than one instance of the server for a
2003 /// given named pipe exists, calling [`create`] will fail with
2004 /// [`std::io::ErrorKind::PermissionDenied`].
2005 ///
2006 /// ```
2007 /// use std::io;
2008 /// use tokio::net::windows::named_pipe::ServerOptions;
2009 ///
2010 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-first-instance-error";
2011 ///
2012 /// # #[tokio::main] async fn main() -> io::Result<()> {
2013 /// let server1 = ServerOptions::new()
2014 /// .first_pipe_instance(true)
2015 /// .create(PIPE_NAME)?;
2016 ///
2017 /// // Second server errs, since it's not the first instance.
2018 /// let e = ServerOptions::new()
2019 /// .first_pipe_instance(true)
2020 /// .create(PIPE_NAME)
2021 /// .unwrap_err();
2022 ///
2023 /// assert_eq!(e.kind(), io::ErrorKind::PermissionDenied);
2024 /// # Ok(()) }
2025 /// ```
2026 ///
2027 /// # Examples
2028 ///
2029 /// ```
2030 /// use std::io;
2031 /// use tokio::net::windows::named_pipe::ServerOptions;
2032 ///
2033 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-first-instance";
2034 ///
2035 /// # #[tokio::main] async fn main() -> io::Result<()> {
2036 /// let mut builder = ServerOptions::new();
2037 /// builder.first_pipe_instance(true);
2038 ///
2039 /// let server = builder.create(PIPE_NAME)?;
2040 /// let e = builder.create(PIPE_NAME).unwrap_err();
2041 /// assert_eq!(e.kind(), io::ErrorKind::PermissionDenied);
2042 /// drop(server);
2043 ///
2044 /// // OK: since, we've closed the other instance.
2045 /// let _server2 = builder.create(PIPE_NAME)?;
2046 /// # Ok(()) }
2047 /// ```
2048 ///
2049 /// [`create`]: ServerOptions::create
2050 /// [`FILE_FLAG_FIRST_PIPE_INSTANCE`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea#pipe_first_pipe_instance
2051 pub fn first_pipe_instance(&mut self, first: bool) -> &mut Self {
2052 self.first_pipe_instance = first;
2053 self
2054 }
2055
2056 /// Requests permission to modify the pipe's discretionary access control list.
2057 ///
2058 /// This corresponds to setting [`WRITE_DAC`] in dwOpenMode.
2059 ///
2060 /// # Examples
2061 ///
2062 /// ```
2063 /// use std::{io, os::windows::prelude::AsRawHandle, ptr};
2064 ///
2065 /// use tokio::net::windows::named_pipe::ServerOptions;
2066 /// use windows_sys::{
2067 /// Win32::Foundation::ERROR_SUCCESS,
2068 /// Win32::Security::DACL_SECURITY_INFORMATION,
2069 /// Win32::Security::Authorization::{SetSecurityInfo, SE_KERNEL_OBJECT},
2070 /// };
2071 ///
2072 /// const PIPE_NAME: &str = r"\\.\pipe\write_dac_pipe";
2073 ///
2074 /// # #[tokio::main] async fn main() -> io::Result<()> {
2075 /// let mut pipe_template = ServerOptions::new();
2076 /// pipe_template.write_dac(true);
2077 /// let pipe = pipe_template.create(PIPE_NAME)?;
2078 ///
2079 /// unsafe {
2080 /// assert_eq!(
2081 /// ERROR_SUCCESS,
2082 /// SetSecurityInfo(
2083 /// pipe.as_raw_handle() as _,
2084 /// SE_KERNEL_OBJECT,
2085 /// DACL_SECURITY_INFORMATION,
2086 /// ptr::null_mut(),
2087 /// ptr::null_mut(),
2088 /// ptr::null_mut(),
2089 /// ptr::null_mut(),
2090 /// )
2091 /// );
2092 /// }
2093 ///
2094 /// # Ok(()) }
2095 /// ```
2096 ///
2097 /// ```
2098 /// use std::{io, os::windows::prelude::AsRawHandle, ptr};
2099 ///
2100 /// use tokio::net::windows::named_pipe::ServerOptions;
2101 /// use windows_sys::{
2102 /// Win32::Foundation::ERROR_ACCESS_DENIED,
2103 /// Win32::Security::DACL_SECURITY_INFORMATION,
2104 /// Win32::Security::Authorization::{SetSecurityInfo, SE_KERNEL_OBJECT},
2105 /// };
2106 ///
2107 /// const PIPE_NAME: &str = r"\\.\pipe\write_dac_pipe_fail";
2108 ///
2109 /// # #[tokio::main] async fn main() -> io::Result<()> {
2110 /// let mut pipe_template = ServerOptions::new();
2111 /// pipe_template.write_dac(false);
2112 /// let pipe = pipe_template.create(PIPE_NAME)?;
2113 ///
2114 /// unsafe {
2115 /// assert_eq!(
2116 /// ERROR_ACCESS_DENIED,
2117 /// SetSecurityInfo(
2118 /// pipe.as_raw_handle() as _,
2119 /// SE_KERNEL_OBJECT,
2120 /// DACL_SECURITY_INFORMATION,
2121 /// ptr::null_mut(),
2122 /// ptr::null_mut(),
2123 /// ptr::null_mut(),
2124 /// ptr::null_mut(),
2125 /// )
2126 /// );
2127 /// }
2128 ///
2129 /// # Ok(()) }
2130 /// ```
2131 ///
2132 /// [`WRITE_DAC`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea
2133 pub fn write_dac(&mut self, requested: bool) -> &mut Self {
2134 self.write_dac = requested;
2135 self
2136 }
2137
2138 /// Requests permission to modify the pipe's owner.
2139 ///
2140 /// This corresponds to setting [`WRITE_OWNER`] in dwOpenMode.
2141 ///
2142 /// [`WRITE_OWNER`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea
2143 pub fn write_owner(&mut self, requested: bool) -> &mut Self {
2144 self.write_owner = requested;
2145 self
2146 }
2147
2148 /// Requests permission to modify the pipe's system access control list.
2149 ///
2150 /// This corresponds to setting [`ACCESS_SYSTEM_SECURITY`] in dwOpenMode.
2151 ///
2152 /// [`ACCESS_SYSTEM_SECURITY`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea
2153 pub fn access_system_security(&mut self, requested: bool) -> &mut Self {
2154 self.access_system_security = requested;
2155 self
2156 }
2157
2158 /// Indicates whether this server can accept remote clients or not. Remote
2159 /// clients are disabled by default.
2160 ///
2161 /// This corresponds to setting [`PIPE_REJECT_REMOTE_CLIENTS`].
2162 ///
2163 /// [`PIPE_REJECT_REMOTE_CLIENTS`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea#pipe_reject_remote_clients
2164 pub fn reject_remote_clients(&mut self, reject: bool) -> &mut Self {
2165 self.reject_remote_clients = reject;
2166 self
2167 }
2168
2169 /// The maximum number of instances that can be created for this pipe. The
2170 /// first instance of the pipe can specify this value; the same number must
2171 /// be specified for other instances of the pipe. Acceptable values are in
2172 /// the range 1 through 254. The default value is unlimited.
2173 ///
2174 /// This corresponds to specifying [`nMaxInstances`].
2175 ///
2176 /// [`nMaxInstances`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea
2177 ///
2178 /// # Errors
2179 ///
2180 /// The same numbers of `max_instances` have to be used by all servers. Any
2181 /// additional servers trying to be built which uses a mismatching value
2182 /// might error.
2183 ///
2184 /// ```
2185 /// use std::io;
2186 /// use tokio::net::windows::named_pipe::{ServerOptions, ClientOptions};
2187 /// use windows_sys::Win32::Foundation::ERROR_PIPE_BUSY;
2188 ///
2189 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-max-instances";
2190 ///
2191 /// # #[tokio::main] async fn main() -> io::Result<()> {
2192 /// let mut server = ServerOptions::new();
2193 /// server.max_instances(2);
2194 ///
2195 /// let s1 = server.create(PIPE_NAME)?;
2196 /// let c1 = ClientOptions::new().open(PIPE_NAME);
2197 ///
2198 /// let s2 = server.create(PIPE_NAME)?;
2199 /// let c2 = ClientOptions::new().open(PIPE_NAME);
2200 ///
2201 /// // Too many servers!
2202 /// let e = server.create(PIPE_NAME).unwrap_err();
2203 /// assert_eq!(e.raw_os_error(), Some(ERROR_PIPE_BUSY as i32));
2204 ///
2205 /// // Still too many servers even if we specify a higher value!
2206 /// let e = server.max_instances(100).create(PIPE_NAME).unwrap_err();
2207 /// assert_eq!(e.raw_os_error(), Some(ERROR_PIPE_BUSY as i32));
2208 /// # Ok(()) }
2209 /// ```
2210 ///
2211 /// # Panics
2212 ///
2213 /// This function will panic if more than 254 instances are specified. If
2214 /// you do not wish to set an instance limit, leave it unspecified.
2215 ///
2216 /// ```should_panic
2217 /// use tokio::net::windows::named_pipe::ServerOptions;
2218 ///
2219 /// # #[tokio::main] async fn main() -> std::io::Result<()> {
2220 /// let builder = ServerOptions::new().max_instances(255);
2221 /// # Ok(()) }
2222 /// ```
2223 #[track_caller]
2224 pub fn max_instances(&mut self, instances: usize) -> &mut Self {
2225 assert!(instances < 255, "cannot specify more than 254 instances");
2226 self.max_instances = instances as u32;
2227 self
2228 }
2229
2230 /// The number of bytes to reserve for the output buffer.
2231 ///
2232 /// This corresponds to specifying [`nOutBufferSize`].
2233 ///
2234 /// [`nOutBufferSize`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea
2235 pub fn out_buffer_size(&mut self, buffer: u32) -> &mut Self {
2236 self.out_buffer_size = buffer;
2237 self
2238 }
2239
2240 /// The number of bytes to reserve for the input buffer.
2241 ///
2242 /// This corresponds to specifying [`nInBufferSize`].
2243 ///
2244 /// [`nInBufferSize`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea
2245 pub fn in_buffer_size(&mut self, buffer: u32) -> &mut Self {
2246 self.in_buffer_size = buffer;
2247 self
2248 }
2249
2250 /// Creates the named pipe identified by `addr` for use as a server.
2251 ///
2252 /// This uses the [`CreateNamedPipe`] function.
2253 ///
2254 /// [`CreateNamedPipe`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea
2255 ///
2256 /// # Errors
2257 ///
2258 /// This errors if called outside of a [Tokio Runtime], or in a runtime that
2259 /// has not [enabled I/O], or if any OS-specific I/O errors occur.
2260 ///
2261 /// [Tokio Runtime]: crate::runtime::Runtime
2262 /// [enabled I/O]: crate::runtime::Builder::enable_io
2263 ///
2264 /// # Examples
2265 ///
2266 /// ```
2267 /// use tokio::net::windows::named_pipe::ServerOptions;
2268 ///
2269 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-create";
2270 ///
2271 /// # #[tokio::main] async fn main() -> std::io::Result<()> {
2272 /// let server = ServerOptions::new().create(PIPE_NAME)?;
2273 /// # Ok(()) }
2274 /// ```
2275 pub fn create(&self, addr: impl AsRef<OsStr>) -> io::Result<NamedPipeServer> {
2276 // Safety: We're calling create_with_security_attributes_raw w/ a null
2277 // pointer which disables it.
2278 unsafe { self.create_with_security_attributes_raw(addr, ptr::null_mut()) }
2279 }
2280
2281 /// Creates the named pipe identified by `addr` for use as a server.
2282 ///
2283 /// This is the same as [`create`] except that it supports providing the raw
2284 /// pointer to a structure of [`SECURITY_ATTRIBUTES`] which will be passed
2285 /// as the `lpSecurityAttributes` argument to [`CreateFile`].
2286 ///
2287 /// # Errors
2288 ///
2289 /// This errors if called outside of a [Tokio Runtime], or in a runtime that
2290 /// has not [enabled I/O], or if any OS-specific I/O errors occur.
2291 ///
2292 /// [Tokio Runtime]: crate::runtime::Runtime
2293 /// [enabled I/O]: crate::runtime::Builder::enable_io
2294 ///
2295 /// # Safety
2296 ///
2297 /// The `attrs` argument must either be null or point at a valid instance of
2298 /// the [`SECURITY_ATTRIBUTES`] structure. If the argument is null, the
2299 /// behavior is identical to calling the [`create`] method.
2300 ///
2301 /// [`create`]: ServerOptions::create
2302 /// [`CreateFile`]: https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-createfilew
2303 /// [`SECURITY_ATTRIBUTES`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/Security/struct.SECURITY_ATTRIBUTES.html
2304 pub unsafe fn create_with_security_attributes_raw(
2305 &self,
2306 addr: impl AsRef<OsStr>,
2307 attrs: *mut c_void,
2308 ) -> io::Result<NamedPipeServer> {
2309 let addr = encode_addr(addr);
2310
2311 let pipe_mode = {
2312 let mut mode = if matches!(self.pipe_mode, PipeMode::Message) {
2313 windows_sys::PIPE_TYPE_MESSAGE | windows_sys::PIPE_READMODE_MESSAGE
2314 } else {
2315 windows_sys::PIPE_TYPE_BYTE | windows_sys::PIPE_READMODE_BYTE
2316 };
2317 if self.reject_remote_clients {
2318 mode |= windows_sys::PIPE_REJECT_REMOTE_CLIENTS;
2319 } else {
2320 mode |= windows_sys::PIPE_ACCEPT_REMOTE_CLIENTS;
2321 }
2322 mode
2323 };
2324 let open_mode = {
2325 let mut mode = windows_sys::FILE_FLAG_OVERLAPPED;
2326 if self.access_inbound {
2327 mode |= windows_sys::PIPE_ACCESS_INBOUND;
2328 }
2329 if self.access_outbound {
2330 mode |= windows_sys::PIPE_ACCESS_OUTBOUND;
2331 }
2332 if self.first_pipe_instance {
2333 mode |= windows_sys::FILE_FLAG_FIRST_PIPE_INSTANCE;
2334 }
2335 if self.write_dac {
2336 mode |= windows_sys::WRITE_DAC;
2337 }
2338 if self.write_owner {
2339 mode |= windows_sys::WRITE_OWNER;
2340 }
2341 if self.access_system_security {
2342 mode |= windows_sys::ACCESS_SYSTEM_SECURITY;
2343 }
2344 mode
2345 };
2346
2347 let h = windows_sys::CreateNamedPipeW(
2348 addr.as_ptr(),
2349 open_mode,
2350 pipe_mode,
2351 self.max_instances,
2352 self.out_buffer_size,
2353 self.in_buffer_size,
2354 self.default_timeout,
2355 attrs as *mut _,
2356 );
2357
2358 if h == windows_sys::INVALID_HANDLE_VALUE {
2359 return Err(io::Error::last_os_error());
2360 }
2361
2362 NamedPipeServer::from_raw_handle(h as _)
2363 }
2364}
2365
2366/// A builder suitable for building and interacting with named pipes from the
2367/// client side.
2368///
2369/// See [`ClientOptions::open`].
2370#[derive(Debug, Clone)]
2371pub struct ClientOptions {
2372 generic_read: bool,
2373 generic_write: bool,
2374 security_qos_flags: u32,
2375 pipe_mode: PipeMode,
2376}
2377
2378impl ClientOptions {
2379 /// Creates a new named pipe builder with the default settings.
2380 ///
2381 /// ```
2382 /// use tokio::net::windows::named_pipe::{ServerOptions, ClientOptions};
2383 ///
2384 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-new";
2385 ///
2386 /// # #[tokio::main] async fn main() -> std::io::Result<()> {
2387 /// // Server must be created in order for the client creation to succeed.
2388 /// let server = ServerOptions::new().create(PIPE_NAME)?;
2389 /// let client = ClientOptions::new().open(PIPE_NAME)?;
2390 /// # Ok(()) }
2391 /// ```
2392 pub fn new() -> Self {
2393 Self {
2394 generic_read: true,
2395 generic_write: true,
2396 security_qos_flags: windows_sys::SECURITY_IDENTIFICATION
2397 | windows_sys::SECURITY_SQOS_PRESENT,
2398 pipe_mode: PipeMode::Byte,
2399 }
2400 }
2401
2402 /// If the client supports reading data. This is enabled by default.
2403 ///
2404 /// This corresponds to setting [`GENERIC_READ`] in the call to [`CreateFile`].
2405 ///
2406 /// [`GENERIC_READ`]: https://docs.microsoft.com/en-us/windows/win32/secauthz/generic-access-rights
2407 /// [`CreateFile`]: https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-createfilew
2408 pub fn read(&mut self, allowed: bool) -> &mut Self {
2409 self.generic_read = allowed;
2410 self
2411 }
2412
2413 /// If the created pipe supports writing data. This is enabled by default.
2414 ///
2415 /// This corresponds to setting [`GENERIC_WRITE`] in the call to [`CreateFile`].
2416 ///
2417 /// [`GENERIC_WRITE`]: https://docs.microsoft.com/en-us/windows/win32/secauthz/generic-access-rights
2418 /// [`CreateFile`]: https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-createfilew
2419 pub fn write(&mut self, allowed: bool) -> &mut Self {
2420 self.generic_write = allowed;
2421 self
2422 }
2423
2424 /// Sets qos flags which are combined with other flags and attributes in the
2425 /// call to [`CreateFile`].
2426 ///
2427 /// By default `security_qos_flags` is set to [`SECURITY_IDENTIFICATION`],
2428 /// calling this function would override that value completely with the
2429 /// argument specified.
2430 ///
2431 /// When `security_qos_flags` is not set, a malicious program can gain the
2432 /// elevated privileges of a privileged Rust process when it allows opening
2433 /// user-specified paths, by tricking it into opening a named pipe. So
2434 /// arguably `security_qos_flags` should also be set when opening arbitrary
2435 /// paths. However the bits can then conflict with other flags, specifically
2436 /// `FILE_FLAG_OPEN_NO_RECALL`.
2437 ///
2438 /// For information about possible values, see [Impersonation Levels] on the
2439 /// Windows Dev Center site. The `SECURITY_SQOS_PRESENT` flag is set
2440 /// automatically when using this method.
2441 ///
2442 /// [`CreateFile`]: https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-createfilea
2443 /// [`SECURITY_IDENTIFICATION`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/Storage/FileSystem/constant.SECURITY_IDENTIFICATION.html
2444 /// [Impersonation Levels]: https://docs.microsoft.com/en-us/windows/win32/api/winnt/ne-winnt-security_impersonation_level
2445 pub fn security_qos_flags(&mut self, flags: u32) -> &mut Self {
2446 // See: https://github.com/rust-lang/rust/pull/58216
2447 self.security_qos_flags = flags | windows_sys::SECURITY_SQOS_PRESENT;
2448 self
2449 }
2450
2451 /// The pipe mode.
2452 ///
2453 /// The default pipe mode is [`PipeMode::Byte`]. See [`PipeMode`] for
2454 /// documentation of what each mode means.
2455 pub fn pipe_mode(&mut self, pipe_mode: PipeMode) -> &mut Self {
2456 self.pipe_mode = pipe_mode;
2457 self
2458 }
2459
2460 /// Opens the named pipe identified by `addr`.
2461 ///
2462 /// This opens the client using [`CreateFile`] with the
2463 /// `dwCreationDisposition` option set to `OPEN_EXISTING`.
2464 ///
2465 /// [`CreateFile`]: https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-createfilea
2466 ///
2467 /// # Errors
2468 ///
2469 /// This errors if called outside of a [Tokio Runtime], or in a runtime that
2470 /// has not [enabled I/O], or if any OS-specific I/O errors occur.
2471 ///
2472 /// There are a few errors you need to take into account when creating a
2473 /// named pipe on the client side:
2474 ///
2475 /// * [`std::io::ErrorKind::NotFound`] - This indicates that the named pipe
2476 /// does not exist. Presumably the server is not up.
2477 /// * [`ERROR_PIPE_BUSY`] - This error is raised when the named pipe exists,
2478 /// but the server is not currently waiting for a connection. Please see the
2479 /// examples for how to check for this error.
2480 ///
2481 /// [`ERROR_PIPE_BUSY`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/Foundation/constant.ERROR_PIPE_BUSY.html
2482 /// [enabled I/O]: crate::runtime::Builder::enable_io
2483 /// [Tokio Runtime]: crate::runtime::Runtime
2484 ///
2485 /// A connect loop that waits until a pipe becomes available looks like
2486 /// this:
2487 ///
2488 /// ```no_run
2489 /// use std::time::Duration;
2490 /// use tokio::net::windows::named_pipe::ClientOptions;
2491 /// use tokio::time;
2492 /// use windows_sys::Win32::Foundation::ERROR_PIPE_BUSY;
2493 ///
2494 /// const PIPE_NAME: &str = r"\\.\pipe\mynamedpipe";
2495 ///
2496 /// # #[tokio::main] async fn main() -> std::io::Result<()> {
2497 /// let client = loop {
2498 /// match ClientOptions::new().open(PIPE_NAME) {
2499 /// Ok(client) => break client,
2500 /// Err(e) if e.raw_os_error() == Some(ERROR_PIPE_BUSY as i32) => (),
2501 /// Err(e) => return Err(e),
2502 /// }
2503 ///
2504 /// time::sleep(Duration::from_millis(50)).await;
2505 /// };
2506 ///
2507 /// // use the connected client.
2508 /// # Ok(()) }
2509 /// ```
2510 pub fn open(&self, addr: impl AsRef<OsStr>) -> io::Result<NamedPipeClient> {
2511 // Safety: We're calling open_with_security_attributes_raw w/ a null
2512 // pointer which disables it.
2513 unsafe { self.open_with_security_attributes_raw(addr, ptr::null_mut()) }
2514 }
2515
2516 /// Opens the named pipe identified by `addr`.
2517 ///
2518 /// This is the same as [`open`] except that it supports providing the raw
2519 /// pointer to a structure of [`SECURITY_ATTRIBUTES`] which will be passed
2520 /// as the `lpSecurityAttributes` argument to [`CreateFile`].
2521 ///
2522 /// # Safety
2523 ///
2524 /// The `attrs` argument must either be null or point at a valid instance of
2525 /// the [`SECURITY_ATTRIBUTES`] structure. If the argument is null, the
2526 /// behavior is identical to calling the [`open`] method.
2527 ///
2528 /// [`open`]: ClientOptions::open
2529 /// [`CreateFile`]: https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-createfilew
2530 /// [`SECURITY_ATTRIBUTES`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/Security/struct.SECURITY_ATTRIBUTES.html
2531 pub unsafe fn open_with_security_attributes_raw(
2532 &self,
2533 addr: impl AsRef<OsStr>,
2534 attrs: *mut c_void,
2535 ) -> io::Result<NamedPipeClient> {
2536 let addr = encode_addr(addr);
2537
2538 let desired_access = {
2539 let mut access = 0;
2540 if self.generic_read {
2541 access |= windows_sys::GENERIC_READ;
2542 }
2543 if self.generic_write {
2544 access |= windows_sys::GENERIC_WRITE;
2545 }
2546 access
2547 };
2548
2549 // NB: We could use a platform specialized `OpenOptions` here, but since
2550 // we have access to windows_sys it ultimately doesn't hurt to use
2551 // `CreateFile` explicitly since it allows the use of our already
2552 // well-structured wide `addr` to pass into CreateFileW.
2553 let h = windows_sys::CreateFileW(
2554 addr.as_ptr(),
2555 desired_access,
2556 0,
2557 attrs as *mut _,
2558 windows_sys::OPEN_EXISTING,
2559 self.get_flags(),
2560 null_mut(),
2561 );
2562
2563 if h == windows_sys::INVALID_HANDLE_VALUE {
2564 return Err(io::Error::last_os_error());
2565 }
2566
2567 if matches!(self.pipe_mode, PipeMode::Message) {
2568 let mode = windows_sys::PIPE_READMODE_MESSAGE;
2569 let result =
2570 windows_sys::SetNamedPipeHandleState(h, &mode, ptr::null_mut(), ptr::null_mut());
2571
2572 if result == 0 {
2573 return Err(io::Error::last_os_error());
2574 }
2575 }
2576
2577 NamedPipeClient::from_raw_handle(h as _)
2578 }
2579
2580 fn get_flags(&self) -> u32 {
2581 self.security_qos_flags | windows_sys::FILE_FLAG_OVERLAPPED
2582 }
2583}
2584
2585/// The pipe mode of a named pipe.
2586///
2587/// Set through [`ServerOptions::pipe_mode`].
2588#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
2589#[non_exhaustive]
2590pub enum PipeMode {
2591 /// Data is written to the pipe as a stream of bytes. The pipe does not
2592 /// distinguish bytes written during different write operations.
2593 ///
2594 /// Corresponds to [`PIPE_TYPE_BYTE`].
2595 ///
2596 /// [`PIPE_TYPE_BYTE`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/System/Pipes/constant.PIPE_TYPE_BYTE.html
2597 Byte,
2598 /// Data is written to the pipe as a stream of messages. The pipe treats the
2599 /// bytes written during each write operation as a message unit. Any reading
2600 /// on a named pipe returns [`ERROR_MORE_DATA`] when a message is not read
2601 /// completely.
2602 ///
2603 /// Corresponds to [`PIPE_TYPE_MESSAGE`].
2604 ///
2605 /// [`ERROR_MORE_DATA`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/Foundation/constant.ERROR_MORE_DATA.html
2606 /// [`PIPE_TYPE_MESSAGE`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/System/Pipes/constant.PIPE_TYPE_MESSAGE.html
2607 Message,
2608}
2609
2610/// Indicates the end of a named pipe.
2611#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
2612#[non_exhaustive]
2613pub enum PipeEnd {
2614 /// The named pipe refers to the client end of a named pipe instance.
2615 ///
2616 /// Corresponds to [`PIPE_CLIENT_END`].
2617 ///
2618 /// [`PIPE_CLIENT_END`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/System/Pipes/constant.PIPE_CLIENT_END.html
2619 Client,
2620 /// The named pipe refers to the server end of a named pipe instance.
2621 ///
2622 /// Corresponds to [`PIPE_SERVER_END`].
2623 ///
2624 /// [`PIPE_SERVER_END`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/System/Pipes/constant.PIPE_SERVER_END.html
2625 Server,
2626}
2627
2628/// Information about a named pipe.
2629///
2630/// Constructed through [`NamedPipeServer::info`] or [`NamedPipeClient::info`].
2631#[derive(Debug, Clone)]
2632#[non_exhaustive]
2633pub struct PipeInfo {
2634 /// Indicates the mode of a named pipe.
2635 pub mode: PipeMode,
2636 /// Indicates the end of a named pipe.
2637 pub end: PipeEnd,
2638 /// The maximum number of instances that can be created for this pipe.
2639 pub max_instances: u32,
2640 /// The number of bytes to reserve for the output buffer.
2641 pub out_buffer_size: u32,
2642 /// The number of bytes to reserve for the input buffer.
2643 pub in_buffer_size: u32,
2644}
2645
2646/// Encodes an address so that it is a null-terminated wide string.
2647fn encode_addr(addr: impl AsRef<OsStr>) -> Box<[u16]> {
2648 let len = addr.as_ref().encode_wide().count();
2649 let mut vec = Vec::with_capacity(len + 1);
2650 vec.extend(addr.as_ref().encode_wide());
2651 vec.push(0);
2652 vec.into_boxed_slice()
2653}
2654
2655/// Internal function to get the info out of a raw named pipe.
2656unsafe fn named_pipe_info(handle: RawHandle) -> io::Result<PipeInfo> {
2657 let mut flags = 0;
2658 let mut out_buffer_size = 0;
2659 let mut in_buffer_size = 0;
2660 let mut max_instances = 0;
2661
2662 let result = windows_sys::GetNamedPipeInfo(
2663 handle as _,
2664 &mut flags,
2665 &mut out_buffer_size,
2666 &mut in_buffer_size,
2667 &mut max_instances,
2668 );
2669
2670 if result == 0 {
2671 return Err(io::Error::last_os_error());
2672 }
2673
2674 let mut end = PipeEnd::Client;
2675 let mut mode = PipeMode::Byte;
2676
2677 if flags & windows_sys::PIPE_SERVER_END != 0 {
2678 end = PipeEnd::Server;
2679 }
2680
2681 if flags & windows_sys::PIPE_TYPE_MESSAGE != 0 {
2682 mode = PipeMode::Message;
2683 }
2684
2685 Ok(PipeInfo {
2686 end,
2687 mode,
2688 out_buffer_size,
2689 in_buffer_size,
2690 max_instances,
2691 })
2692}